tutorials-and-examples/example-notebooks/Example - Azure Storage VT Hash Lookup.ipynb (182 lines of code) (raw):

{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "<h1>Azure Storage VirusTotal Lookup</h1>\n", "<p>This sample notebook will extract all MD5 file hashes from Azure Storage logs and then perform a VirusTotal lookup using MSTICPy TILookup()</p>\n", "\n", "<h2>Supported Azure Storage containers</h2>\n", "<li><b>StorageBlobLogs</b> - Azure Blob Storage</li>\n", "<li><b>StorageFileLogs</b> - Azure File Storage</li>\n", "\n", "<h3>Step 1 - Imports</h3>" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pathlib import Path\n", "import os\n", "import sys\n", "import warnings\n", "from ipywidgets import HBox\n", "from IPython.display import display, HTML, Markdown\n", "import importlib\n", "import ipywidgets as widgets\n", "from ipywidgets import HBox\n", "\n", "REQ_PYTHON_VER = \"3.10\"\n", "REQ_MSTICPY_VER = \"2.12.0\"\n", "\n", "display(HTML(\"<h3>Starting Notebook setup...</h3>\"))\n", "\n", "# If not using Azure Notebooks, install msticpy with\n", "# %pip install msticpy\n", "from msticpy.nbtools import nbinit\n", "nbinit.init_notebook(\n", " namespace=globals(),\n", " extra_imports=[\"ipwhois, IPWhois\"]\n", ");\n", "\n", "display(HTML(\"<h3>Step 2 - Select Workspace</h3>\"))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "try:\n", " pick_time_range = widgets.Dropdown(\n", " options=['3d', '7d', '14d', '30d', '60d'],\n", " decription=\"Time range\",\n", " disabled=False,\n", " )\n", "\n", " workspaces_available = WorkspaceConfig().list_workspaces()\n", "\n", " target_workspace = widgets.Dropdown(\n", " options=workspaces_available.keys(),\n", " decription=\"Workspace\")\n", "\n", " display(HBox([pick_time_range, target_workspace]))\n", "\n", " display(HTML(\"<h3>Step 3 - Connect to Workspace</h3>\"))\n", "except RuntimeError:\n", " md(\"\"\"You do not have any Workspaces configured in your config files.\n", " Please run the https://github.com/Azure/Azure-Sentinel-Notebooks/blob/master/ConfiguringNotebookEnvironment.ipynb\n", " to setup these files before proceeding\"\"\" ,'bold')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "time_range = pick_time_range.value\n", "workspace_name = target_workspace.value\n", "# Collect Microsoft Sentinel Workspace Details from our config file and use them to connect\n", "try:\n", " # Update to WorkspaceConfig(workspace=\"WORKSPACE_NAME\") to get alerts from a Workspace other than your default one.\n", " # Run WorkspaceConfig().list_workspaces() to see a list of configured workspaces\n", " ws_config = WorkspaceConfig(workspace=workspace_name)\n", " ws_id = ws_config['workspace_id']\n", " ten_id = ws_config['tenant_id']\n", " md(\"Workspace details collected from config file\")\n", " qry_prov = QueryProvider(data_environment='LogAnalytics')\n", " qry_prov.connect(connection_str=ws_config.code_connect_str)\n", "\n", " display(HTML(\"<h3>Step 4 - Collect Storage Logs Data</h3>\"))\n", "except RuntimeError:\n", " md(\"\"\"You do not have any Workspaces configured in your config files.\n", " Please run the https://github.com/Azure/Azure-Sentinel-Notebooks/blob/master/ConfiguringNotebookEnvironment.ipynb\n", " to setup these files before proceeding\"\"\" ,'bold')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "alert_summary_query = f'''\n", "union\n", "StorageFileLogs,\n", "StorageBlobLogs\n", "| where TimeGenerated > ago({time_range})\n", "| where OperationName =~ \"PutBlob\" or OperationName =~ \"PutRange\"\n", "| extend ClientIP = tostring(split(CallerIpAddress, \":\", 0)[0])\n", "| extend FileName = extract(@\"\\/([\\w\\-. ]+)\\?\", 1, Uri)\n", "| extend base64Char = base64_decode_toarray(ResponseMd5)\n", "| mv-expand base64Char\n", "| extend hexChar = tohex(toint(base64Char))\n", "| extend hexChar = iff(strlen(hexChar) < 2, strcat(\"0\", hexChar), hexChar)\n", "| extend SourceTable = iff(OperationName has \"range\", \"StorageFileLogs\", \"StorageBlobLogs\")\n", "| summarize make_list(hexChar) by CorrelationId, ResponseMd5, FileName, AccountName, TimeGenerated, RequestBodySize, ClientIP, SourceTable\n", "| extend Md5Hash = strcat_array(list_hexChar, \"\")\n", "| project TimeGenerated, FileName, ClientIP, SourceTable, Md5Hash, AccountName, RequestBodySize\n", "'''\n", "\n", "alertout = qry_prov.exec_query(alert_summary_query)\n", "\n", "if alertout.empty:\n", " display(HTML(\"<h4>ERROR: No File Data</h4>\"))\n", " display(HTML(\"<p>No files were found in your storage account logs.</p>\"))\n", "else:\n", " display(HTML(\"<h4>Storage Log File Data</h4>\"))\n", " display(HTML(\"<p>The following file information was collcted from your storage account</p>\"))\n", " display(alertout)\n", " display(HTML(\"<h3>Step 5 - Query VirusTotal</h3>\"))\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "hashes = []\n", "hash_count = 0\n", "\n", "hashset = alertout.Md5Hash.unique().tolist()\n", "\n", "print(f\"Querying VirusTotal for {len(hashset)} hashes\")\n", "\n", "ti_lookup = TILookup(providers=[\"VirusTotal\"])\n", "ti_lookup.provider_status\n", "result = ti_lookup.lookup_iocs(data=hashset, providers=\"VirusTotal\")\n", "\n", "display(HTML(\"<h3>Success - VirusTotal Results</h3>\"))\n", "display(result)" ] } ], "metadata": { "kernelspec": { "display_name": "msticpy", "language": "python", "name": "msticpy" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.3" } }, "nbformat": 4, "nbformat_minor": 4 }